Questions tagged [pyspark]

The Spark Python API (PySpark) exposes the apache-spark programming model to Python.

0
votes
1answer
12 views

How to get Top N most similar items based on cosine similarity results in a dataframe?

I have a dataset of images ( id, url, features ) on which I performed a Cosine Similarity between all images. The result is a pyspark dataframe with the following structure : >>> cos_df....
0
votes
0answers
9 views

Load XML files from a folder with Pyspark

I want to load XML files from a specific folder with Pyspark. But I don't want to use com.databricks.spark.xml package. From every example, I get using com.databricks.spark.xml package. Is there any ...
1
vote
2answers
18 views

spark.executor.instances over spark.dynamicAllocation.enabled = True

I'm working in a Spark project using MapR distribution where the dynamic allocation is enabled. Please refer to the below parameters : spark.dynamicAllocation.enabled true spark.shuffle....
0
votes
0answers
10 views

Why does a PySpark UDF that operates on a column generated by rand() fail?

Given the following Python function: def f(col): return col If I turn it into a UDF and apply it to a column object, it works... from pyspark.sql import functions as F from pyspark.sql.types ...
-4
votes
0answers
20 views

TextNow API creation

I would like to know if any python developers would like to assist in reverse engineering and developing a client library for TextNow. I can successfully log in and send messages, but there have been ...
0
votes
0answers
18 views

How to do a kinit inside spark?

I would like to read two hdfs directory(/hdfs_path_1/ and /hdfs_path_2/), which have different user permissions, in spark (cluster mode). I was able to read hdfs_path_1 as I spark submit with user 1. ...
1
vote
0answers
14 views

How to Add Additional Remote Repository to Zeppelin?

Using the following code in a Zepplin note, I was able to add a repository and dependency. How would I accomplish the same using zeppelin.dep.additionalRemoteRepository? %dep z.addRepo("hortonworks")...
0
votes
0answers
14 views

Is there any difference between temp view and temporary view?

I am learning sql using pyspark-sql. I just wanted to check if in some conditions the two lines given in the Code, in particular ('create or replace temporary view') and ('create or replace temp view')...
1
vote
1answer
17 views

Pyspark udf fails for a function with no arguments but works for a lambda with no arguments

I'm trying to add a column to my Spark DataFrame using withColumn and udf that takes no arguments. This only seems to work if I use a lambda to encapsulate my original function. Here's a MWE: from ...
1
vote
2answers
38 views

Is spark persist() (then action) really persisting?

I always understood that persist() and cache(), then action to activate the DAG, will calculate and keep the result in memory for later use. A lot of threads here will tell you to cache to enhance the ...
-1
votes
0answers
22 views

Applying user defined function on RDD returns an error. How to apply UDF on rdd?

I have a Spark dataset with several columns and is in this format. I have defined a function that uses the data from a subset of columns from the dataset to create 2 new columns. #Assume all the ...
1
vote
2answers
21 views

Can't fix JSONDecodeError: Invalid control character

I have a json file which contains the following content: #create test.json and add content dbutils.fs.put("test.json",'{"type": "abc","project_id": "abc","private_key_id": "123","private_key": "-----...
-1
votes
0answers
14 views

Is there pysprk function for raise of 0.20 (or 20%)?

Is there pysprk function for raise of 0.20 (or 20%) ? I have been trying with this code it right or I'm wrong ? df = df.withColumn("Raise", df.Salary * 0.20)
0
votes
0answers
7 views

How do I use hdfs file with Wand for Image conversion using Pyspark - Hadoop HDP 2.6.6

I am trying to convert pdf files to Image and then use pytesseract to ocr the files. I was able to do it successfully on the files which are present in the linux local path but not with hdfs path. ...
1
vote
0answers
26 views

Spark array of arrays: how to extract first element of each subarray (structtype)? [duplicate]

I have a spark structured streaming dataframe column that contains an array of arrays, like: [[ts1, url1], ... , [tsN, urlN]] where each subarray is a struct: schema_visits = StructType( fields ...
2
votes
0answers
28 views

Accessing broadcast variables in user defined function (udf) in separate files

I have broadcast variable setup in a separate py file and I am then importing the same in a file that contains my UDFs. But when I try to use this variable in UDF, I see that broadcast variable is not ...
0
votes
1answer
26 views

Hive table is read multiple times when using spark.sql and union

I have a single Hive table that is used in multiple subsequent spark.sql queries. Each stage shows a HiveTableScan, that is not necessary as the table only needs to be read once. How can I avoid ...
0
votes
1answer
13 views

Pyspark error on creating dataframe: 'StructField' object has no attribute 'encode'

I'm facing a little issue when creating a dataframe: from pyspark.sql import SparkSession, types spark = SparkSession.builder.appName('test').getOrCreate() df_test = spark.createDataFrame( ['a ...
2
votes
0answers
20 views

RDD repartition doesn't seem to shuffle if increase partitions? [duplicate]

Spark v2.4 spark.range(5, numPartitions=1).rdd.keys().repartition(7).glom().collect() # [[], [], [], [], [], [], [0, 1, 2, 3, 4]] # 2 partitions initially spark.range(5, numPartitions=2).rdd.keys()....
-1
votes
1answer
13 views

How to create dataframe from dict in another dataframe?

I'm have a column of spark-dataframe Output from df.select('parsed').show(): +--------------------+ | parsed| +--------------------+ |{Action Flags=I, ...| |{Action Flags=I, ...| |{...
0
votes
0answers
11 views

pyspark- Fastest way of writing the dataframe to csv

I am currently writing the dataframe to csv by using write.csv. df.withColumn("x", col("x").cast("string")).write.csv(path="mycsv",sep=";") But it is very slow when writing millions and millions of ...
1
vote
0answers
23 views

Spark Add Meta Data to Parquet / ORC Files

On a daily basis, I need to summarise the activities of customers for various use cases. In order to maintain / not overwrite I make sure that every time I am writing to parquet (partitioned by date) ...
-1
votes
0answers
4 views

i am getting following error when i was running pyspark Kafka consumer program

Spark Streaming's Kafka libraries not found in class path. Try one of the following. Include the Kafka library and its dependencies with in the spark-submit command as $ bin/spark-submit --packages ...
0
votes
1answer
26 views

How to save spark dataframe with different table name on each iteration using saveAsTable in pyspark

Platform: RHEL 7, cloudera CDH 6.2 hadoop distrubution, pyspark 3.7.1 What i tried: I could write a table to hive warehouse when I explicitly mention the table name as saveAsTable("tablename"). But, ...
0
votes
0answers
14 views

How to execute PySpark code in a remote cluster?

I'm using a remote Spark cluster with YARN. I'm trying to execute this code in Windows and send the code to be executed in the Spark cluster: from pyspark import SparkContext, SparkConf from operator ...
0
votes
0answers
14 views

Input path doesn't exist in pyspark for hadoop path

Am trying to get the fetch the file from hdfs in pyspark using visual studio code... i have checked through jps all the nodes are in active status only. my file path in hadoop is hadoop fs -cat emp/...
0
votes
0answers
9 views

Class org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem not found

I am trying to dockerize my an edge node for my HDInsight Spark cluster on Azure cloud. To test it out, I am running a simple pyspark job where I read data from SQLServer into a dataframe, print the ...
0
votes
0answers
17 views

Real-Time Kafka Data Ingestion into HBase via PySpark - java.lang.NoSuchMethodError: hbase.client.Put.add([B[B[B)Lorg/apache/hadoop/hbase/client/Put;

I try to configure Real-Time Kafka Data Ingestion into HBase via PySpark in accordance to this tutorial. I have a problem with the code shown below. At the moment I just try to add data to the Hbase ...
2
votes
3answers
74 views

Why the Spark's repartition didn't balance data into partitions?

>>> rdd = sc.parallelize(range(10), 2) >>> rdd.glom().collect() [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]] >>> rdd.repartition(3).glom().collect() [[], [0, 1, 2, 3, 4], [5, 6, 7, 8, ...
1
vote
1answer
31 views

Spark 2.1.1: How to predict topics in unseen documents on already trained LDA model in Spark 2.1.1?

I am training an LDA model in pyspark (spark 2.1.1) on a customers review dataset. Now based on that model I want to predict the topics in the new unseen text. I am using the following code to make ...
0
votes
1answer
12 views

Where does databricks delta stores it's metadata?

Hive stores it's metadata I'm external database like SQL server. Similar to that where does the databricks delta stores it's metadata Information?
0
votes
0answers
12 views

How to Plot Python XGBdecision tree on Databricks

I am having an issue to plot Xgboost decision tree on databricks. XGboost is installed for python which makes things bit weird when working in this environment. import xgboost as xgb from xgboost ...
0
votes
0answers
26 views

Pyspark running in cluster mode fails to resolve jars

I am trying to submit a pyspark app to a kubernetes cluster via spark-submit. The py spark jobs has some dependencies, which i specify via maven coordinates like so: spark-submit --master k8s://http:...
0
votes
0answers
13 views

Writing a dataframe into a sql-server 2017 is giving exception: Column FirstName has a data type that cannot participate in a columnstore index

In databricks I am trying to write a dataframe in sql data warehouse using JDBC connector. I am not using blob container as an intermediate place during read and write. So there is a direct connection ...
0
votes
0answers
25 views

AWS Glue : java.lang.UnsupportedOperationException: CSV data source does not support binary data type

I'm trying to implement upsert with aws glue and databricks using preactions and postactions, Here is the code below sample_dataframe.write.format("com.databricks.spark.redshift")\ .option("url", "...
0
votes
1answer
18 views

AttributeError: 'NoneType' object has no attribute 'persist'

When I try to persist Dataframe in pyspark, I meet the AttributeError: 'NoneType' object has no attribute 'persist' error. pseudo code is as follows: ss = SparkSession.builder.getOrCreate() sqlDF = ...
0
votes
1answer
20 views

How to split a column that contains multiple key-value pairs into different columns in pyspark

I am working on a very large dataset called Reddit on AWS. I have read the data as RDD form and the data looks like: (The real data has more key-value pairs) rdd = [(0, {"author": abc, "id": 012, "...
2
votes
1answer
14 views

How to output column values from pyspark dataframe into string?

I'm working with a dataset and want to create a textblob of all values of a particular column called 'text'. I tried the following methods: xp = positive.select("text").collect().map(_(0)).toList #...
0
votes
1answer
38 views

Spark filter not working as expected.. 'Column' object is not callable

When using the "and" clause in filter on Spark Dataframe, it returns Spark.SQL.Column instead of Spark Dataframe. But for one condition it works fine. How to show() or iterate through Spark Sql ...
0
votes
0answers
26 views

Pyspark structured Streaming with kafka

I am trying to run the spark structured streaming code reading from Kafka. On my structured stream I am doing a group by operation and trying to print the value like this- spark = SparkSession....
1
vote
2answers
40 views

Spark Scala: update dataframe column's value from another dataframe

a = +------------+------------+------+ | Name| Nationality|Salary| +------------+------------+------+ | A. Abbas| Iraq| €2K| | A. Abdallah| France| €1K| |A. Abdennour| ...
0
votes
0answers
11 views

Pyenv seems to cause the “wrong” spark executable to run. How to fix it?

I cannot seem to get pyspark working nicely with pyenv. Running pyspark whilst a virtualenv is activated fails. Calling the "right" binary directly seems to work. (anaconda3-5.3.0) ➜ ~ which pyspark ...
0
votes
0answers
12 views

Summing a dataframe's column of dense vectors

I'm looking to sum a dataframe's column where every row has a dense vector of the same dimension. The output I would like is a numpy array of that dimension. Using the ml.feature.VectorAssembler, I ...
0
votes
1answer
24 views

write to Google Cloud Storage using spark to absolute path

I am trying to write a spark dataframe into google cloud storage. This dataframe has got some updates so I need a partition strategy. SO I need to write it into exact file in GCS. i have Created a ...
0
votes
0answers
20 views

Data Locality and Block Size [on hold]

We are running spark with data being on AWS S3,(Note its not EMR , But Databricks/Qubole) What is the data locality here, I am assuming there is no data locality In Hadoop block size is 128MB and ...
0
votes
1answer
19 views

Pyspark - Filter RDD With Dates in Broadcast Dictionary

I have a python dictionary that I broadcast which contains date filters by user. nested_filter = {"user1":"2018-02-15"} b_filter = sc.broadcast(nested_filter) I want to use this broadcast variable ...
2
votes
3answers
49 views

Subset one array column with another (boolean) array column

I have a Dataframe like this (in Pyspark 2.3.1): from pyspark.sql import Row my_data = spark.createDataFrame([ Row(a=[9, 3, 4], b=['a', 'b', 'c'], mask=[True, False, False]), Row(a=[7, 2, 6, 4], ...
0
votes
1answer
39 views

How to fix the below issue while creating a derived column in pyspark?

I am trying to do binning on a particular column in dataframe based on the data given in the dictionary. Below is the dataframe I used: df SNo,Name,CScore 1,x,700 2,y,850 3,z,560 4,a,578 5,b,456 6,...
0
votes
0answers
42 views

Why repartition is not take effect in huge pyspark dataframe?

I have 10 nodes with 32 cores and 125 g each. I also have a dataframe called oldEmployee with two column employeName and its salary. df = .. oldEmployee = df.rdd.map(lambda item:....) ...
-1
votes
0answers
19 views

How to correct a null error in dataframe writing in orc format using pyspark?

I want to write a dataframe in orc format to a file on S3. My spark version is 2.2.1(AWS Glue). I have been able to write the normal dataframes very well in orc format. However, the current dataframe ...

http://mssss.yulina-kosm.ru